/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.fnexecution.environment;



import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.concurrent.ThreadSafe;

import java.io.File;

import java.io.IOException;

import java.util.*;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



/** A simple process manager which forks processes and kills them if necessary. */

@ThreadSafe

public class ProcessManager {

  private static final Logger LOG = LoggerFactory.getLogger(ProcessManager.class);



  /** For debugging purposes, we inherit I/O of processes. */

  private static final boolean INHERIT_IO = LOG.isDebugEnabled();



  /** A list of all managers to ensure all processes shutdown on JVM exit . */

  private static final List<ProcessManager> ALL_PROCESS_MANAGERS = new ArrayList<>();



  static {

    // Install a shutdown hook to ensure processes are stopped/killed.

    Runtime.getRuntime().addShutdownHook(ShutdownHook.create());

  }



  private final Map<String, Process> processes;



  public static ProcessManager create() {

    synchronized (ALL_PROCESS_MANAGERS) {

      ProcessManager processManager = new ProcessManager();

      ALL_PROCESS_MANAGERS.add(processManager);

      return processManager;

    }

  }



  private ProcessManager() {

    this.processes = Collections.synchronizedMap(new HashMap<>());

  }



  static class RunningProcess {

    private Process process;



    RunningProcess(Process process) {

      this.process = process;

    }



    /** Checks if the underlying process is still running. */

    void isAliveOrThrow() throws IllegalStateException {

      if (!process.isAlive()) {

        throw new IllegalStateException("Process died with exit code " + process.exitValue());

      }

    }



    @VisibleForTesting

    Process getUnderlyingProcess() {

      return process;

    }

  }



  /**

   * Forks a process with the given command and arguments.

   *

   * @param id A unique id for the process

   * @param command the name of the executable to run

   * @param args arguments to provide to the executable

   * @return A RunningProcess which can be checked for liveness

   */

  RunningProcess startProcess(String id, String command, List<String> args) throws IOException {

    return startProcess(id, command, args, Collections.emptyMap());

  }



  /**

   * Forks a process with the given command, arguments, and additional environment variables.

   *

   * @param id A unique id for the process

   * @param command The name of the executable to run

   * @param args Arguments to provide to the executable

   * @param env Additional environment variables for the process to be forked

   * @return A RunningProcess which can be checked for liveness

   */

  public RunningProcess startProcess(

      String id, String command, List<String> args, Map<String, String> env) throws IOException {

    checkNotNull(id, "Process id must not be null");

    checkNotNull(command, "Command must not be null");

    checkNotNull(args, "Process args must not be null");

    checkNotNull(env, "Environment map must not be null");



    ProcessBuilder pb =

        new ProcessBuilder(ImmutableList.<String>builder().add(command).addAll(args).build());

    pb.environment().putAll(env);



    if (INHERIT_IO) {

      LOG.debug(

          "==> DEBUG enabled: Inheriting stdout/stderr of process (adjustable in ProcessManager)");

      pb.inheritIO();

    } else {

      pb.redirectErrorStream(true);

      // Pipe stdout and stderr to /dev/null to avoid blocking the process due to filled PIPE buffer

      if (System.getProperty("os.name", "").startsWith("Windows")) {

        pb.redirectOutput(new File("nul"));

      } else {

        pb.redirectOutput(new File("/dev/null"));

      }

    }



    LOG.debug("Attempting to start process with command: {}", pb.command());

    Process newProcess = pb.start();

    Process oldProcess = processes.put(id, newProcess);

    if (oldProcess != null) {

      stopProcess(id, oldProcess);

      stopProcess(id, newProcess);

      throw new IllegalStateException("There was already a process running with id " + id);

    }



    return new RunningProcess(newProcess);

  }



  /** Stops a previously started process identified by its unique id. */

  public void stopProcess(String id) {

    checkNotNull(id, "Process id must not be null");

    Process process = checkNotNull(processes.remove(id), "Process for id does not exist: " + id);

    stopProcess(id, process);

  }



  private void stopProcess(String id, Process process) {

    if (process.isAlive()) {

      LOG.debug("Attempting to stop process with id {}", id);

      // first try to kill gracefully

      process.destroy();

      long maxTimeToWait = 2000;

      if (waitForProcessToDie(process, maxTimeToWait)) {

        LOG.debug("Process for worker {} shut down gracefully.", id);

      } else {

        LOG.info("Process for worker {} still running. Killing.", id);

        process.destroyForcibly();

        if (waitForProcessToDie(process, maxTimeToWait)) {

          LOG.debug("Process for worker {} killed.", id);

        } else {

          LOG.warn("Process for worker {} could not be killed.", id);

        }

      }

    }

  }



  /** Returns true if the process exists within maxWaitTimeMillis. */

  private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis) {

    final long startTime = System.currentTimeMillis();

    while (process.isAlive() && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {

      try {

        Thread.sleep(100);

      } catch (InterruptedException e) {

        Thread.currentThread().interrupt();

        throw new RuntimeException("Interrupted while waiting on process", e);

      }

    }

    return !process.isAlive();

  }



  private static class ShutdownHook extends Thread {



    private static ShutdownHook create() {

      return new ShutdownHook();

    }



    private ShutdownHook() {}



    @Override

    @SuppressFBWarnings("SWL_SLEEP_WITH_LOCK_HELD")

    public void run() {

      synchronized (ALL_PROCESS_MANAGERS) {

        ALL_PROCESS_MANAGERS.forEach(ProcessManager::stopAllProcesses);

        for (ProcessManager pm : ALL_PROCESS_MANAGERS) {

          if (pm.processes.values().stream().anyMatch(Process::isAlive)) {

            try {

              // Graceful shutdown period

              Thread.sleep(200);

              break;

            } catch (InterruptedException e) {

              Thread.currentThread().interrupt();

              throw new RuntimeException(e);

            }

          }

        }

        ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);

      }

    }

  }



  /** Stop all remaining processes gracefully, i.e. upon JVM shutdown */

  private void stopAllProcesses() {

    processes.forEach((id, process) -> process.destroy());

  }



  /** Kill all remaining processes forcibly, i.e. upon JVM shutdown */

  private void killAllProcesses() {

    processes.forEach((id, process) -> process.destroyForcibly());

  }

}